package com.atguigu.flink.datastreamapi.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

/**
 * Created by Smexy on 2023/11/11

    KafkaSource，flink充当消费者的角色。

        Kafka中存储的数据都是byte[] ----> 读数据byte[] ----> ConsumerRecord(byte [] K,byte []V) ----->反序列化
            一般情况下，只获取ConsumerRecord的Value部分。
            一般情况下，ConsumerRecord的Value是 jsonString。

   ------------------------
    如何找到消费者相关的参数配置名?
         所有消费者相关的参数名都封装在一个类 ConsumerConfig

    --------------------
        在flink中，只要构造的类的构造器不是public，99%的情况，无法new。
            大部分都是使用建造者模式，让你去构造对象

    ---------------------
         <OUT> KafkaSourceBuilder<OUT> builder(): 泛型方法。
                在调用时  xxx.<泛型类型>方法名()
            OUT读取kafka以后，流中的数据类型。

    ---------------------
        消费者是一个线程，可以消费一个或多个partition。
        多个消费者可以设置一个组，会组内的规则的制约。同组中，一个partition只能被一个消费者消费。
            共享偏移量信息，有新的消费者加入时，会触发reblance


    ---------------------------
        一般情况下，kafka消费者，需要把上次读取的位置信息(offsets)，保存到kafka的一个__consumer_offsets中。
            下次启动，读取__consumer_offsets，就知道上次消费的位置，从上次消费的位置向后消费。
        消费策略: earliest ,latest ，只影响第一次消费。

        状态： 计算期间，要保存的一些变量。
        flink消费kakfa，把上次读取的位置信息(offsets)存储在算子的状态(内存)中，下次程序启动时，通过读取状态，知道上次读取的位置。
            正常操作，需要把算子的状态，备份到外部的持久化设备(HDFS),后续会讲到。

 */
public class Demo4_KafkaSource
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //如何找到kafka的指定主题
        KafkaSource<String> kafkaSource = KafkaSource
            .<String>builder()
            .setBootstrapServers("hadoop102:9092,hadoop103:9092")  //指定kafka集群地址
            .setTopics("t1") //消费的主题，99%都是读一个主题
            //.setGroupId("test1")  //消费者组
            //.setStartingOffsets(OffsetsInitializer.earliest()) //读取策略
            //从kakfa这个组提交过的位置向后消费，如果这个组从来没有提交过offsets，就是一个新的组，从头消费
            .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
            //只把ConsumerRecored的value部分，反序列化 byte[] ---> xxx
            .setValueOnlyDeserializer(new SimpleStringSchema())  //设置反序列化器
            .setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test3")  //设置一些额外的消费者参数信息
            .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true")  //允许把读取的位置自动提交给kafka保存
            .setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000") //每间隔1000ms提交一次
            .build();

        env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kakfa")
            .print();



        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
